Skip to content

[core] Optimize ScheduleAndGrantLeases by snapshotting node availabil…#60800

Open
g199209 wants to merge 4 commits intoray-project:masterfrom
g199209:pr_optimize_schedule_and_grant
Open

[core] Optimize ScheduleAndGrantLeases by snapshotting node availabil…#60800
g199209 wants to merge 4 commits intoray-project:masterfrom
g199209:pr_optimize_schedule_and_grant

Conversation

@g199209
Copy link
Contributor

@g199209 g199209 commented Feb 6, 2026

This PR optimizes the performance of ScheduleAndGrantLeases in the Raylet.

The Problem:
Previously, the scheduling policy performed a full liveness check for every candidate node for every lease being scheduled. In scenarios with a large number of leases (e.g., 2000+) and multiple nodes (e.g., 50+), this resulted in ~100k expensive calls per scheduling round. Each call involved an expensive path: StringIdMap::Get (mutex + string copy) -> NodeID::FromBinary (hash + allocation) -> GCS cache lookup (mutex). This caused ScheduleAndGrantLeases to more than 60 seconds in reported cases, blocking the event loop.

The Fix:
Introduced a per-round node availability snapshot mechanism in ClusterResourceScheduler.

  • At the start of a scheduling round, we snapshot the liveness of all nodes exactly once.
  • Subsequent NodeAvailable() calls within the same round use this O(1) snapshot.
  • This reduces the number of expensive liveness check paths from $O(N \times M)$ to $O(M)$, where $N$ is the number of leases and $M$ is the number of nodes.
  • Note: Node draining status is intentionally excluded from the snapshot and remains a live check to ensure immediate reaction to draining events.

Key Changes

  • Added BeginSchedulingRound() and EndSchedulingRound() to ClusterResourceScheduler to manage the snapshot lifecycle.
  • Modified NodeAvailable() to prioritize the snapshot for remote node liveness.
  • Wrapped ScheduleAndGrantLeases loops in ClusterLeaseManager and LocalLeaseManager with the snapshot API (reentrant-safe).
  • Added unit tests to verify call count reduction and live draining behavior.

Test plan

  • New Unit Tests: Added to src/ray/raylet/scheduling/tests/cluster_resource_scheduler_test.cc:
    • NodeAvailableSnapshotTest: Confirms is_node_available_fn_ is called only once per node per round.
    • NodeAvailableSnapshotReentrantTest: Confirms nested scheduling rounds (e.g., Cluster calling Local) work correctly.
    • NodeAvailableSnapshotDrainingTest: Confirms draining status is NOT cached and remains live.

…ity per round

Signed-off-by: mingfei <mingfei@mds-trading.com>
@g199209 g199209 requested a review from a team as a code owner February 6, 2026 06:20
Copy link
Contributor

@gemini-code-assist gemini-code-assist bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Code Review

This pull request introduces a significant performance optimization for ScheduleAndGrantLeases by snapshotting node availability at the beginning of each scheduling round. The implementation is well-structured, introducing BeginSchedulingRound and EndSchedulingRound to manage the snapshot's lifecycle and making it reentrant-safe. The changes are well-tested with new unit tests covering the snapshotting logic, reentrancy, and interaction with node draining status. My review includes a couple of suggestions to improve const-correctness and robustness. Overall, this is a great improvement.

Signed-off-by: mingfei <mingfei@mds-trading.com>
Copy link

@cursor cursor bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Cursor Bugbot has reviewed your changes and found 1 potential issue.

Signed-off-by: mingfei <mingfei@mds-trading.com>
@ray-gardener ray-gardener bot added the community-contribution Contributed by the community label Feb 6, 2026
@Yicheng-Lu-llll
Copy link
Member

Nice optimization! The snapshot approach is a clean win given that the Raylet event loop is single threaded. GCS liveness cache can't change mid round, so caching it is just eliminating provably redundant work.

Two questions:

  • 60 seconds seems surprisingly high. Could you share more details: profiling data, reproduction script?
  • Why exclude draining from the snapshot? Since the Raylet is single threaded, a node can't transition to draining mid-round either.

Signed-off-by: mingfei <mingfei@mds-trading.com>
@g199209
Copy link
Contributor Author

g199209 commented Feb 6, 2026

  • 60 seconds seems surprisingly high. Could you share more details: profiling data, reproduction script?
  • perf top:
 │     15.77%  raylet               [.] ray::NodeID::NodeID                                                                                                                                                                                                 │
 │     10.07%  raylet               [.] std::_Function_handler<bool (ray::BaseSchedulingID<(ray::SchedulingIDTag)0>), main::{lambda(ray::Status, std::optional<std::__cxx11::basic_string<char, std::char_traits<char>, std::allocator<char> > >  const&)#5}::operat           |                                                                                                                                                                                                                           │
 │      9.76%  raylet               [.] ray::StringIdMap::Get[abi:cxx11]                                                                                                                                                                                    │
 │      8.17%  raylet               [.] absl::lts_20230802::Mutex::ReaderUnlock                                                                                                                                                                             │
 │      7.16%  raylet               [.] ray::MurmurHash64A                                                                                                                                                                                                  │
 │      6.56%  raylet               [.] ray::raylet_scheduling_policy::HybridSchedulingPolicy::ScheduleImpl                                                                                                                                                 │
 │      6.38%  raylet               [.] ray::gcs::NodeInfoAccessor::IsNodeAlive                                                                                                                                                                             │
 │      4.60%  libjemalloc.so       [.] je_je_sdallocx_noflags                                                                                                                                                                                              │
 │      4.40%  raylet               [.] ray::raylet_scheduling_policy::HybridSchedulingPolicy::IsNodeFeasible                                                                                                                                               │
 │                                                                                                                                                                                                                                                          
  • pstack:
 │   Thread 1 (Thread 0x7f86eb65e4c0 (LWP 2120704)):                                                                                                                                                                                                        │
 │   #0  0x000055584b30454e in ray::raylet_scheduling_policy::HybridSchedulingPolicy::IsNodeFeasible(ray::BaseSchedulingID<(ray::SchedulingIDTag)0> const&, ray::raylet_scheduling_policy::HybridSchedulingPolicy::NodeFilter const&, ray::NodeResources    │
 │   const&, ray::ResourceRequest const&) const ()                                                                                                                                                                                                          │
 │   #1  0x000055584b305678 in ray::raylet_scheduling_policy::HybridSchedulingPolicy::ScheduleImpl(ray::ResourceRequest const&, float, bool, bool, ray::raylet_scheduling_policy::HybridSchedulingPolicy::NodeFilter, std::__cxx11::basic_string<char,      │
 │   std::char_traits<char>, std::allocator<char> > const&, int, float) ()                                                                                                                                                                                  │
 │   #2  0x000055584b306197 in ray::raylet_scheduling_policy::HybridSchedulingPolicy::Schedule(ray::ResourceRequest const&, ray::raylet_scheduling_policy::SchedulingOptions) ()                                                                            │
 │   #3  0x000055584b2bf257 in ray::raylet_scheduling_policy::CompositeSchedulingPolicy::Schedule(ray::ResourceRequest const&, ray::raylet_scheduling_policy::SchedulingOptions) ()                                                                         │
 │   #4  0x000055584b2bc603 in ray::ClusterResourceScheduler::GetBestSchedulableNode(ray::ResourceRequest const&, ray::rpc::SchedulingStrategy const&, bool, bool, std::__cxx11::basic_string<char, std::char_traits<char>, std::allocator<char> > const&,  │
 │   long*, bool*) ()                                                                                                                                                                                                                                       │
 │   #5  0x000055584b2bdb77 in ray::ClusterResourceScheduler::GetBestSchedulableNode(absl::lts_20230802::flat_hash_map<std::__cxx11::basic_string<char, std::char_traits<char>, std::allocator<char> >, double,                                             │
 │   absl::lts_20230802::container_internal::StringHash, absl::lts_20230802::container_internal::StringEq, std::allocator<std::pair<std::__cxx11::basic_string<char, std::char_traits<char>, std::allocator<char> > const, double> > > const&,              │
 │   ray::LabelSelector const&, ray::rpc::SchedulingStrategy const&, bool, bool, bool, std::__cxx11::basic_string<char, std::char_traits<char>, std::allocator<char> > const&, long*, bool*) ()                                                             │
 │   #6  0x000055584b2bdf4f in ray::ClusterResourceScheduler::GetBestSchedulableNode(ray::LeaseSpecification const&, std::__cxx11::basic_string<char, std::char_traits<char>, std::allocator<char> > const&, bool, bool, bool*) ()                          │
 │   #7  0x000055584b0c9c86 in ray::raylet::LocalLeaseManager::TrySpillback(std::shared_ptr<ray::raylet::internal::Work> const&, bool&) ()                                                                                                                  │
 │   #8  0x000055584b0caef9 in ray::raylet::LocalLeaseManager::GrantScheduledLeasesToWorkers() ()                                                                                                                                                           │
 │   #9  0x000055584b0cc361 in ray::raylet::LocalLeaseManager::ScheduleAndGrantLeases() ()                                                                                                                                                                  │
 │   #10 0x000055584b2b547d in ray::raylet::ClusterLeaseManager::ScheduleAndGrantLeases() ()                                                                                                                                                                │
 │   #11 0x000055584b184e50 in boost::asio::detail::wait_handler<ray::raylet::WorkerPool::MonitorStartingWorkerProcess(ray::WorkerID const&, ray::rpc::Language const&, ray::rpc::WorkerType)::{lambda(boost::system::error_code)#1},                       │
 │   boost::asio::any_io_executor>::do_complete(void*, boost::asio::detail::scheduler_operation*, boost::system::error_code const&, unsigned long) ()                                                                                                       │
 │   #12 0x000055584bc8333f in boost::asio::detail::scheduler::do_run_one(boost::asio::detail::conditionally_enabled_mutex::scoped_lock&, boost::asio::detail::scheduler_thread_info&, boost::system::error_code const&) ()                                 │
 │   #13 0x000055584bc85131 in boost::asio::detail::scheduler::run(boost::system::error_code&) ()                                                                                                                                                           │
 │   #14 0x000055584bc856e1 in boost::asio::io_context::run() ()                                                                                                                                                                                            │
 │   #15 0x000055584af568ec in main ()                                                                                                                                                                                                                      
 │   Thread 1 (Thread 0x7f86eb65e4c0 (LWP 2120704)):                                                                                                                                                                                                        │
 │   #0  0x00007f86ea51fc4e in __memmove_avx512_unaligned_erms () from /lib64/libc.so.6                                                                                                                                                                     │
 │   #1  0x000055584bd05d04 in std::__cxx11::basic_string<char, std::char_traits<char>, std::allocator<char> >::_M_assign(std::__cxx11::basic_string<char, std::char_traits<char>, std::allocator<char> > const&) ()                                        │
 │   #2  0x000055584b4ff796 in ray::StringIdMap::Get[abi:cxx11](unsigned long) const ()                                                                                                                                                                     │
 │   #3  0x000055584af84f60 in std::_Function_handler<bool (ray::BaseSchedulingID<(ray::SchedulingIDTag)0>), main::{lambda(ray::Status, std::optional<std::__cxx11::basic_string<char, std::char_traits<char>, std::allocator<char> > >                     │
 │   const&)#5}::operator()(ray::Status, std::optional<std::__cxx11::basic_string<char, std::char_traits<char>, std::allocator<char> > > const&) const::{lambda(ray::BaseSchedulingID<(ray::SchedulingIDTag)0>)#20}>::_M_invoke(std::_Any_data const&,      │
 │   ray::BaseSchedulingID<(ray::SchedulingIDTag)0>&&) ()                                                                                                                                                                                                   │
 │   #4  0x000055584b2bbb4e in ray::ClusterResourceScheduler::NodeAvailable(ray::BaseSchedulingID<(ray::SchedulingIDTag)0>) const ()                                                                                                                        │
 │   #5  0x000055584b304562 in ray::raylet_scheduling_policy::HybridSchedulingPolicy::IsNodeFeasible(ray::BaseSchedulingID<(ray::SchedulingIDTag)0> const&, ray::raylet_scheduling_policy::HybridSchedulingPolicy::NodeFilter const&, ray::NodeResources    │
 │   const&, ray::ResourceRequest const&) const ()                                                                                                                                                                                                          │
 │   #6  0x000055584b305678 in ray::raylet_scheduling_policy::HybridSchedulingPolicy::ScheduleImpl(ray::ResourceRequest const&, float, bool, bool, ray::raylet_scheduling_policy::HybridSchedulingPolicy::NodeFilter, std::__cxx11::basic_string<char,      │
 │   std::char_traits<char>, std::allocator<char> > const&, int, float) ()                                                                                                                                                                                  │
 │   #7  0x000055584b306197 in ray::raylet_scheduling_policy::HybridSchedulingPolicy::Schedule(ray::ResourceRequest const&, ray::raylet_scheduling_policy::SchedulingOptions) ()                                                                            │
 │   #8  0x000055584b2bf257 in ray::raylet_scheduling_policy::CompositeSchedulingPolicy::Schedule(ray::ResourceRequest const&, ray::raylet_scheduling_policy::SchedulingOptions) ()                                                                         │
 │   #9  0x000055584b2bc603 in ray::ClusterResourceScheduler::GetBestSchedulableNode(ray::ResourceRequest const&, ray::rpc::SchedulingStrategy const&, bool, bool, std::__cxx11::basic_string<char, std::char_traits<char>, std::allocator<char> > const&,  │
 │   long*, bool*) ()                                                                                                                                                                                                                                       │
 │   #10 0x000055584b2bdb77 in ray::ClusterResourceScheduler::GetBestSchedulableNode(absl::lts_20230802::flat_hash_map<std::__cxx11::basic_string<char, std::char_traits<char>, std::allocator<char> >, double,                                             │
 │   absl::lts_20230802::container_internal::StringHash, absl::lts_20230802::container_internal::StringEq, std::allocator<std::pair<std::__cxx11::basic_string<char, std::char_traits<char>, std::allocator<char> > const, double> > > const&,              │
 │   ray::LabelSelector const&, ray::rpc::SchedulingStrategy const&, bool, bool, bool, std::__cxx11::basic_string<char, std::char_traits<char>, std::allocator<char> > const&, long*, bool*) ()                                                             │
 │   #11 0x000055584b2bdf4f in ray::ClusterResourceScheduler::GetBestSchedulableNode(ray::LeaseSpecification const&, std::__cxx11::basic_string<char, std::char_traits<char>, std::allocator<char> > const&, bool, bool, bool*) ()                          │
 │   #12 0x000055584b0c9c86 in ray::raylet::LocalLeaseManager::TrySpillback(std::shared_ptr<ray::raylet::internal::Work> const&, bool&) ()                                                                                                                  │
 │   #13 0x000055584b0caef9 in ray::raylet::LocalLeaseManager::GrantScheduledLeasesToWorkers() ()                                                                                                                                                           │
 │   #14 0x000055584b0cc361 in ray::raylet::LocalLeaseManager::ScheduleAndGrantLeases() ()                                                                                                                                                                  │
 │   #15 0x000055584b2b547d in ray::raylet::ClusterLeaseManager::ScheduleAndGrantLeases() ()                                                                                                                                                                │
 │   #16 0x000055584b184e50 in boost::asio::detail::wait_handler<ray::raylet::WorkerPool::MonitorStartingWorkerProcess(ray::WorkerID const&, ray::rpc::Language const&, ray::rpc::WorkerType)::{lambda(boost::system::error_code)#1},                       │
 │   boost::asio::any_io_executor>::do_complete(void*, boost::asio::detail::scheduler_operation*, boost::system::error_code const&, unsigned long) ()                                                                                                       │
 │   #17 0x000055584bc8333f in boost::asio::detail::scheduler::do_run_one(boost::asio::detail::conditionally_enabled_mutex::scoped_lock&, boost::asio::detail::scheduler_thread_info&, boost::system::error_code const&) ()                                 │
 │   #18 0x000055584bc85131 in boost::asio::detail::scheduler::run(boost::system::error_code&) ()                                                                                                                                                           │
 │   #19 0x000055584bc856e1 in boost::asio::io_context::run() ()                                                                                                                                                                                            │
 │   #20 0x000055584af568ec in main ()

How I found this problem

  1. Scheduling appeared stuck – Many tasks/actors stayed pending for a long time even though the cluster had idle CPU & GPU nodes and no new jobs were being submitted.
  2. Pending was concentrated on one node – A small script that consumed GCS get_all_resource_usage() and iterated ResourceUsageBatchData.batch showed that all “ready_queued” came from a single worker node (one node_id). Other nodes had zero in their Schedule/Infeasible/Waiting queues.
  3. That node had no usable workers – On that worker we checked raylet logs and metrics: Grant queue length ~2000, num_tasks_waiting_for_workers and num_waiting_for_remote_node_resources high, but num PYTHON workers: 0, num idle workers: 0, and process_failed_pending_registration in the thousands. Logs showed: “Some workers … have not registered within the timeout. The process is still alive, probably it's hanging during start.” So workers were started but never “registered” within the 60s timeout, and were killed – hence no workers to grant leases to and scheduling effectively stuck on that node.
  4. Raylet on that node was at 100% CPU – Despite almost no user tasks running there, the raylet process was pegging one core. That suggested the raylet was busy in its own logic rather than waiting on I/O or external events.
  5. perf & pstack pointed at the hot path – We took perf and pstack on that raylet. The hot path was: LocalLeaseManager::GrantScheduledLeasesToWorkersTrySpillback GetBestSchedulableNodeHybridSchedulingPolicy::ScheduleImpl / IsNodeFeasibleNodeAvailable (and inside that, the lambda calling StringIdMap::Get and NodeInfoAccessor::IsNodeAlive). So for every lease in the grant queue (~2000), the code was doing a full “best node” search over all nodes, and for each node it was calling the node-availability callback (GCS liveness + StringIdMap::Get).
  6. Conclusion – The 60s registration timeout looked high, but the real issue was that the raylet’s event loop was dominated by this scheduling path. The main thread was stuck in ScheduleAndGrantLeases (and thus never processing accept / RegisterClientRequest in time), so workers that had already connected were still seen as “not registered” when the 60s timer fired. So: redundant work in the scheduler (no cache for node liveness) → long-running scheduling rounds → event loop starvation → registration handler runs too late → workers killed by 60s timeout → no workers, more leases, more scheduling work.
  • Why exclude draining from the snapshot? Since the Raylet is single threaded, a node can't transition to draining mid-round either.

You're right, fixed.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

community-contribution Contributed by the community

Projects

None yet

Development

Successfully merging this pull request may close these issues.

2 participants